@@ -252,7 +252,7 @@ class Agent < ActiveRecord::Base |
||
| 252 | 252 |
select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
|
| 253 | 253 |
joins("JOIN links ON (links.receiver_id = agents.id)").
|
| 254 | 254 |
joins("JOIN agents AS sources ON (links.source_id = sources.id)").
|
| 255 |
- joins("JOIN events ON (events.agent_id = sources.id)").
|
|
| 255 |
+ joins("JOIN events ON (events.agent_id = sources.id AND events.id > links.event_id_at_creation)").
|
|
| 256 | 256 |
where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
|
| 257 | 257 |
|
| 258 | 258 |
agents_to_events = {}
|
@@ -4,4 +4,10 @@ class Link < ActiveRecord::Base |
||
| 4 | 4 |
|
| 5 | 5 |
belongs_to :source, :class_name => "Agent", :inverse_of => :links_as_source |
| 6 | 6 |
belongs_to :receiver, :class_name => "Agent", :inverse_of => :links_as_receiver |
| 7 |
+ |
|
| 8 |
+ before_create :store_event_id_at_creation |
|
| 9 |
+ |
|
| 10 |
+ def store_event_id_at_creation |
|
| 11 |
+ self.event_id_at_creation = source.events.limit(1).reorder("id desc").pluck(:id).first || 0
|
|
| 12 |
+ end |
|
| 7 | 13 |
end |
@@ -0,0 +1,18 @@ |
||
| 1 |
+class AddEventIdAtCreationToLinks < ActiveRecord::Migration |
|
| 2 |
+ def up |
|
| 3 |
+ add_column :links, :event_id_at_creation, :integer, :null => false, :default => 0 |
|
| 4 |
+ |
|
| 5 |
+ execute <<-SQL |
|
| 6 |
+ UPDATE #{ActiveRecord::Base.connection.quote_table_name('links')}
|
|
| 7 |
+ SET event_id_at_creation = ( |
|
| 8 |
+ SELECT #{ActiveRecord::Base.connection.quote_column_name('id')}
|
|
| 9 |
+ FROM #{ActiveRecord::Base.connection.quote_table_name('events')}
|
|
| 10 |
+ WHERE events.agent_id = links.source_id ORDER BY events.id DESC limit 1 |
|
| 11 |
+ ) |
|
| 12 |
+ SQL |
|
| 13 |
+ end |
|
| 14 |
+ |
|
| 15 |
+ def down |
|
| 16 |
+ remove_column :links, :event_id_at_creation |
|
| 17 |
+ end |
|
| 18 |
+end |
@@ -11,7 +11,7 @@ |
||
| 11 | 11 |
# |
| 12 | 12 |
# It's strongly recommended to check this file into your version control system. |
| 13 | 13 |
|
| 14 |
-ActiveRecord::Schema.define(:version => 20140210062747) do |
|
| 14 |
+ActiveRecord::Schema.define(:version => 20140213053001) do |
|
| 15 | 15 |
|
| 16 | 16 |
create_table "agent_logs", :force => true do |t| |
| 17 | 17 |
t.integer "agent_id", :null => false |
@@ -88,8 +88,9 @@ ActiveRecord::Schema.define(:version => 20140210062747) do |
||
| 88 | 88 |
create_table "links", :force => true do |t| |
| 89 | 89 |
t.integer "source_id" |
| 90 | 90 |
t.integer "receiver_id" |
| 91 |
- t.datetime "created_at", :null => false |
|
| 92 |
- t.datetime "updated_at", :null => false |
|
| 91 |
+ t.datetime "created_at", :null => false |
|
| 92 |
+ t.datetime "updated_at", :null => false |
|
| 93 |
+ t.integer "event_id_at_creation", :default => 0, :null => false |
|
| 93 | 94 |
end |
| 94 | 95 |
|
| 95 | 96 |
add_index "links", ["receiver_id", "source_id"], :name => "index_links_on_receiver_id_and_source_id" |
@@ -221,8 +221,13 @@ describe Agent do |
||
| 221 | 221 |
it "should track when events have been seen and not received them again" do |
| 222 | 222 |
mock.any_instance_of(Agents::TriggerAgent).receive(anything).once |
| 223 | 223 |
Agent.async_check(agents(:bob_weather_agent).id) |
| 224 |
- Agent.receive! |
|
| 225 |
- Agent.receive! |
|
| 224 |
+ lambda {
|
|
| 225 |
+ Agent.receive! |
|
| 226 |
+ }.should change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }
|
|
| 227 |
+ |
|
| 228 |
+ lambda {
|
|
| 229 |
+ Agent.receive! |
|
| 230 |
+ }.should_not change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }
|
|
| 226 | 231 |
end |
| 227 | 232 |
|
| 228 | 233 |
it "should not run consumers that have nothing to do" do |
@@ -238,6 +243,39 @@ describe Agent do |
||
| 238 | 243 |
Agent.async_check(agents(:jane_weather_agent).id) |
| 239 | 244 |
Agent.receive! |
| 240 | 245 |
end |
| 246 |
+ |
|
| 247 |
+ it "should ignore events that were created before a particular Link" do |
|
| 248 |
+ agent2 = Agents::SomethingSource.new(:name => "something") |
|
| 249 |
+ agent2.user = users(:bob) |
|
| 250 |
+ agent2.save! |
|
| 251 |
+ agent2.check |
|
| 252 |
+ |
|
| 253 |
+ mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice |
|
| 254 |
+ agents(:bob_weather_agent).check # bob_weather_agent makes an event |
|
| 255 |
+ |
|
| 256 |
+ lambda {
|
|
| 257 |
+ Agent.receive! # event gets propagated |
|
| 258 |
+ }.should change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }
|
|
| 259 |
+ |
|
| 260 |
+ # This agent creates a few events before we link to it, but after our last check. |
|
| 261 |
+ agent2.check |
|
| 262 |
+ agent2.check |
|
| 263 |
+ |
|
| 264 |
+ # Now we link to it. |
|
| 265 |
+ agents(:bob_rain_notifier_agent).sources << agent2 |
|
| 266 |
+ agent2.links_as_source.first.event_id_at_creation.should == agent2.events.reorder("events.id desc").first.id
|
|
| 267 |
+ |
|
| 268 |
+ lambda {
|
|
| 269 |
+ Agent.receive! # but we don't receive those events because they're too old |
|
| 270 |
+ }.should_not change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }
|
|
| 271 |
+ |
|
| 272 |
+ # Now a new event is created by agent2 |
|
| 273 |
+ agent2.check |
|
| 274 |
+ |
|
| 275 |
+ lambda {
|
|
| 276 |
+ Agent.receive! # and we receive it |
|
| 277 |
+ }.should change { agents(:bob_rain_notifier_agent).reload.last_checked_event_id }
|
|
| 278 |
+ end |
|
| 241 | 279 |
end |
| 242 | 280 |
|
| 243 | 281 |
describe "creating a new agent and then calling .receive!" do |